[AWS IoT Core] AWS IoT Device SDK v2 for Python を使用して MQTT v5 で新たに拡張されたプロパティを送受信してみました
1 はじめに
re:Invent 2022 で AWS IoT Core の MQTT v5 対応が発表され、引き続き、Python の SDK でも「MQTT v 5対応」がリリースされました。
https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/documents/MQTT5.md
今回は、AWS IoT Device SDK v2 for Python を使用して、 MQTT v5 で新たに拡張されたプロパティを Publish/Subscribe してみました。
注:2022/12/6 現在 AWS IoT Device SDK v2 for Python は、Developer Preview となっています。
2 mqtt5.PublishPacket
Python SDK での publish は、以下のような形で実装されますが、ここでは、送信内容を表現するオブジェクトとして、mqtt5.PublishPacket クラスが使用されています。
publish_future = client.publish(mqtt5.PublishPacket( topic=topic, payload=payload, qos=mqtt5.QoS.AT_LEAST_ONCE ))
SDK のawscrt/mqtt5.pyを確認すると、PublishPacketクラスは、以下のようになっていました。 拡張プロパティには、それぞれ名前が付与され、クラス変数として定義されています。
class PublishPacket: """Data model of an `MQTT5 PUBLISH `_ packet Args: payload (Any): The payload of the publish message. qos (QoS): The MQTT quality of service associated with this PUBLISH packet. retain (bool): True if this is a retained message, false otherwise. topic (str): The topic associated with this PUBLISH packet. payload_format_indicator (PayloadFormatIndicator): Property specifying the format of the payload data. The mqtt5 client does not enforce or use this value in a meaningful way. message_expiry_interval_sec (int): Sent publishes - indicates the maximum amount of time allowed to elapse for message delivery before the server should instead delete the message (relative to a recipient). Received publishes - indicates the remaining amount of time (from the server's perspective) before the message would have been deleted relative to the subscribing client. If left None, indicates no expiration timeout. topic_alias (int): An integer value that is used to identify the Topic instead of using the Topic Name. response_topic (str): Opaque topic string intended to assist with request/response implementations. Not internally meaningful to MQTT5 or this client. correlation_data (Any): Opaque binary data used to correlate between publish messages, as a potential method for request-response implementation. Not internally meaningful to MQTT5. subscription_identifiers (Sequence[int]): The subscription identifiers of all the subscriptions this message matched. content_type (str): Property specifying the content type of the payload. Not internally meaningful to MQTT5. user_properties (Sequence[UserProperty]): List of MQTT5 user properties included with the packet. """ payload: Any = "" # Unicode objects are converted to C strings using 'utf-8' encoding qos: QoS = QoS.AT_MOST_ONCE retain: bool = False topic: str = "" payload_format_indicator: PayloadFormatIndicator = None message_expiry_interval_sec: int = None topic_alias: int = None response_topic: str = None correlation_data: Any = None # Unicode objects are converted to C strings using 'utf-8' encoding subscription_identifiers: 'Sequence[int]' = None # ignore attempts to set but provide in received packets content_type: str = None user_properties: 'Sequence[UserProperty]' = None
また、user_propertiesは、UserPropertyクラスの配列となっています。
class UserProperty: """MQTT5 User Property Args: name (str): Property name value (str): Property value """ name: str = None value: str = None
3 コード
サンプルとして書いてみたコードです。
接続後に、各種のプロパティを設定したメッセージを1個送信し、同時に Subscribe で受け取ってデコードして表示しています。
import os import time import json from concurrent.futures import Future from awsiot import mqtt5_client_builder from awscrt import mqtt5 endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir), "certfile": "{}/certificates/client-cert.pem".format(dir), "keyfile": "{}/certificates/private-key.pem".format(dir), } client_id = "client_id" topic = "sensor/device01" payload = "message" TIMEOUT = 100 future_connection_success = Future() def on_publish_received(publish_packet_data): publish_packet = publish_packet_data.publish_packet print("on_message topic:{} payload:{}".format(publish_packet.topic, publish_packet.payload)) #print("{}".format(publish_packet_data)) DEBUG用 print("response_topic:{}".format(publish_packet.response_topic)) print("content_type:{}".format(publish_packet.content_type)) print("message_expiry_interval_sec:{}".format(publish_packet.message_expiry_interval_sec)) print("correlation_data:{}".format(publish_packet.correlation_data)) print("payload_format_indicator:{}".format(publish_packet.payload_format_indicator)) print("user_properties") for user_property in publish_packet.user_properties: print(" name:{} value:{}".format(user_property.name, user_property.value)) def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): print("on_connect_success") global future_connection_success future_connection_success.set_result(lifecycle_connect_success_data) if __name__ == '__main__': client = mqtt5_client_builder.mtls_from_path( endpoint=endpoint, port=port, cert_filepath=certs["certfile"], pri_key_filepath=certs["keyfile"], ca_filepath=certs["cafile"], on_publish_received=on_publish_received, on_lifecycle_connection_success=on_lifecycle_connection_success, client_id=client_id) client.start() lifecycle_connect_success_data = future_connection_success.result(TIMEOUT) connack_packet = lifecycle_connect_success_data.connack_packet negotiated_settings = lifecycle_connect_success_data.negotiated_settings # Subscribe subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket( subscriptions=[ mqtt5.Subscription( topic_filter=topic, qos=mqtt5.QoS.AT_LEAST_ONCE) ] )) subscribe_future.result(TIMEOUT) # Publish publish_future = client.publish(mqtt5.PublishPacket( topic=topic, payload=payload, user_properties=[ mqtt5.UserProperty(name="key1",value="value1"), mqtt5.UserProperty(name="key2",value="value2"), mqtt5.UserProperty(name="key3",value="value3") ], qos=mqtt5.QoS.AT_LEAST_ONCE, response_topic="response/device01", content_type="test/plain", message_expiry_interval_sec=120, correlation_data = json.dumps({"seq":100}).encode('utf-8'), payload_format_indicator = 1 )) publish_completion_data = publish_future.result(TIMEOUT) time.sleep(1) # Disconnect client.stop()
4 実行結果
実行すると、以下のようになります。
% python3 index.py on_connect_success on_message topic:sensor/device01 payload:b'message' response_topic:response/device01 content_type:test/plain message_expiry_interval_sec:119 correlation_data:{"seq": 100} payload_format_indicator:1 user_properties name:key1 value:value1 name:key2 value:value2 name:key3 value:value3
このメッセージを MQTT テストクライアントでサブスクライブした場合も、Properties で内容を確認できます。
5 最後に
今回は、AWS IoT Device SDK v2 for Python を使用して、 MQTT v5 で新たに拡張されたプロパティを Publish/Subscribe してみました。
昨日まで使ってきた、phao.mqttとは、ちょっと違った形になりますが、クラス定義が分かりやすいと感じました。 ドキュメントは、まだ、現時点でそこまで詳しくないですが、コードを見れば特に問題なく実装できそうです。
6 参考リンク
[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
[AWS IoT Core] MQTT v5 を使用してレスポンスコードの確認を実装して見ました
[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
[AWS IoT Core] MQTT v5 に対応した 「MQTT テストクライアント」の動作を確認して見ました
[AWS IoT Core] MQTT v5 で追加されたユーザープロパティを ルール で取得して Lambda で使用してみました
[AWS IoT Core] AWS IoT Device SDK v2 for Python で MQTT5 のサポートが始まりました (Developer Preview)
[AWS IoT Core] MQTT v5 で新たに追加されたプロパティ値を Republish で追加してみました